{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Using Online Machine Learning on a StreamPipes data stream\n",
    "The last tutorial ([Getting live data from the StreamPipes data stream](../3-getting-live-data-from-the-streampipes-data-stream)) showed how we can connect to a data stream, and it would be possible to use Online Machine Learning with this approach and train a model with the incoming events at the `onEvent` method. However, the StreamPipes client also provides an easier way to do this with the use of the [River library](https://riverml.xyz) for Online Machine Learning. We will have a look at this now."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "from streampipes.client import StreamPipesClient\n",
    "from streampipes.client.config import StreamPipesClientConfig\n",
    "from streampipes.client.credential_provider import StreamPipesApiKeyCredentials"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# you can install all required dependencies for this tutorial by executing the following command\n",
    "%pip install river streampipes"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "\n",
    "os.environ[\"SP_USERNAME\"] = \"admin@streampipes.apache.org\"\n",
    "os.environ[\"SP_API_KEY\"] = \"XXX\"\n",
    "\n",
    "# Use this if you work locally:\n",
    "os.environ[\"BROKER-HOST\"] = \"localhost\"  \n",
    "os.environ[\"KAFKA-PORT\"] = \"9094\" # When using Kafka as message broker"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2023-01-27 16:04:24,784 - streampipes.client.client - [INFO] - [client.py:128] [_set_up_logging] - Logging successfully initialized with logging level INFO.\n"
     ]
    }
   ],
   "source": [
    "client_config = StreamPipesClientConfig(\n",
    "    credential_provider=StreamPipesApiKeyCredentials(),\n",
    "    host_address=\"localhost\",\n",
    "    port=80,\n",
    "    https_disabled=True,\n",
    ")\n",
    "client = StreamPipesClient(client_config=client_config)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2023-01-27 16:04:28,212 - streampipes.endpoint.endpoint - [INFO] - [endpoint.py:163] [_make_request] - Successfully retrieved all resources.\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>element_id</th>\n",
       "      <th>name</th>\n",
       "      <th>description</th>\n",
       "      <th>icon_url</th>\n",
       "      <th>app_id</th>\n",
       "      <th>includes_assets</th>\n",
       "      <th>includes_locales</th>\n",
       "      <th>internally_managed</th>\n",
       "      <th>measurement_object</th>\n",
       "      <th>index</th>\n",
       "      <th>...</th>\n",
       "      <th>dom</th>\n",
       "      <th>rev</th>\n",
       "      <th>num_transport_protocols</th>\n",
       "      <th>num_measurement_capability</th>\n",
       "      <th>num_application_links</th>\n",
       "      <th>num_included_assets</th>\n",
       "      <th>num_connected_to</th>\n",
       "      <th>num_category</th>\n",
       "      <th>num_event_properties</th>\n",
       "      <th>num_included_locales</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>sp:spdatastream:xboBFK</td>\n",
       "      <td>Test</td>\n",
       "      <td></td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>False</td>\n",
       "      <td>False</td>\n",
       "      <td>True</td>\n",
       "      <td>None</td>\n",
       "      <td>0</td>\n",
       "      <td>...</td>\n",
       "      <td>None</td>\n",
       "      <td>5-558c861debc745e1ebae29a266a8bdb9</td>\n",
       "      <td>1</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>7</td>\n",
       "      <td>0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>urn:streampipes.apache.org:eventstream:Wgyrse</td>\n",
       "      <td>Test File</td>\n",
       "      <td></td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>False</td>\n",
       "      <td>False</td>\n",
       "      <td>True</td>\n",
       "      <td>None</td>\n",
       "      <td>0</td>\n",
       "      <td>...</td>\n",
       "      <td>None</td>\n",
       "      <td>4-66548b6b84287011b7cec0876ef82baf</td>\n",
       "      <td>1</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>2</td>\n",
       "      <td>0</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "<p>2 rows × 22 columns</p>\n",
       "</div>"
      ],
      "text/plain": [
       "                                      element_id       name description  \\\n",
       "0                         sp:spdatastream:xboBFK       Test               \n",
       "1  urn:streampipes.apache.org:eventstream:Wgyrse  Test File               \n",
       "\n",
       "  icon_url app_id  includes_assets  includes_locales  internally_managed  \\\n",
       "0     None   None            False             False                True   \n",
       "1     None   None            False             False                True   \n",
       "\n",
       "  measurement_object  index  ...   dom                                 rev  \\\n",
       "0               None      0  ...  None  5-558c861debc745e1ebae29a266a8bdb9   \n",
       "1               None      0  ...  None  4-66548b6b84287011b7cec0876ef82baf   \n",
       "\n",
       "  num_transport_protocols num_measurement_capability  num_application_links  \\\n",
       "0                       1                          0                      0   \n",
       "1                       1                          0                      0   \n",
       "\n",
       "   num_included_assets  num_connected_to  num_category  num_event_properties  \\\n",
       "0                    0                 0             0                     7   \n",
       "1                    0                 0             0                     2   \n",
       "\n",
       "   num_included_locales  \n",
       "0                     0  \n",
       "1                     0  \n",
       "\n",
       "[2 rows x 22 columns]"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "client.dataStreamApi.all().to_pandas()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## How to use Online Machine Learning with StreamPipes\n",
    "After we configured the client as usual, we can start with the new part.   \n",
    "The approach is straight forward and you can start with the ML part in just 3 steps:\n",
    "1. Create a pipeline with River and insert the preprocessing steps and model of your choice.\n",
    "2. Configure the `OnlineML` wrapper to fit to your model and insert the client and required data stream ids.\n",
    "3. Start the wrapper and let the learning begin.\n",
    "\n",
    "A StreamPipesFunction is then started, which trains the model for each new event. It also creates an output data stream which will send the prediction of the model back to StreamPipes. This output stream can be seen when creating a new pipeline and can be used like every other data source. So you can use it in a pipeline and save the predictions in a Data Lake.\n",
    "You can also stop and start the training with the method `set_learning`. To stop the whole function use the `stop` methode and if you want to delete the output stream entirely, you can go to the `Pipeline Element Installer` in StreamPipes and uninstall it.  \n",
    "  \n",
    "Now let's take a look at some examples. If you want to execute the examples below you have to create an adapter for the `Machine Data Simulator`, select the `flowrate` sensor and insert the stream id of this stream."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## KMeans"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2023-01-27 16:04:35,599 - streampipes.endpoint.endpoint - [INFO] - [endpoint.py:163] [_make_request] - Successfully retrieved all resources.\n",
      "2023-01-27 16:04:35,599 - streampipes.functions.function_handler - [INFO] - [function_handler.py:64] [initializeFunctions] - Create output data stream \"sp:spdatastream:cwKPoo\" for the function \"65cf8b86-bcdf-433e-a1c7-3e920eab55d0\"\n",
      "2023-01-27 16:04:37,766 - streampipes.endpoint.endpoint - [INFO] - [endpoint.py:163] [_make_request] - Successfully retrieved all resources.\n",
      "2023-01-27 16:04:37,767 - streampipes.functions.function_handler - [INFO] - [function_handler.py:78] [initializeFunctions] - Using NatsBroker for RiverFunction\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2023-01-27 16:04:37,791 - streampipes.functions.broker.nats_broker - [INFO] - [nats_broker.py:48] [_makeConnection] - Connected to NATS at localhost:4222\n",
      "2023-01-27 16:04:37,791 - streampipes.functions.broker.nats_broker - [INFO] - [nats_broker.py:48] [_makeConnection] - Connected to NATS at localhost:4222\n",
      "2023-01-27 16:04:37,792 - streampipes.functions.broker.nats_broker - [INFO] - [nats_broker.py:58] [createSubscription] - Subscribed to stream: sp:spdatastream:xboBFK\n"
     ]
    }
   ],
   "source": [
    "from river import cluster, compose, preprocessing\n",
    "from streampipes.function_zoo.river_function import OnlineML\n",
    "from streampipes.functions.utils.data_stream_generator import RuntimeType\n",
    "\n",
    "k_means = compose.Pipeline(\n",
    "    (\"drop_features\", compose.Discard(\"sensorId\", \"timestamp\")),\n",
    "    (\"scale\", preprocessing.StandardScaler()),\n",
    "    (\"k_means\", cluster.KMeans(n_clusters=2)),\n",
    ")\n",
    "\n",
    "clustering = OnlineML(\n",
    "    client=client, stream_ids=[\"sp:spdatastream:xboBFK\"], model=k_means, prediction_type=RuntimeType.INTEGER.value\n",
    ")\n",
    "clustering.start()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "clustering.set_learning(False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2023-01-27 16:04:57,303 - streampipes.functions.broker.nats_broker - [INFO] - [nats_broker.py:82] [disconnect] - Stopped connection to stream: sp:spdatastream:xboBFK\n",
      "2023-01-27 16:04:57,304 - streampipes.functions.broker.nats_broker - [INFO] - [nats_broker.py:82] [disconnect] - Stopped connection to stream: sp:spdatastream:cwKPoo\n"
     ]
    }
   ],
   "source": [
    "clustering.stop()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## HoeffdingTreeRegressor"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pickle\n",
    "from river import compose, tree\n",
    "from streampipes.function_zoo.river_function import OnlineML\n",
    "from streampipes.functions.utils.data_stream_generator import RuntimeType\n",
    "\n",
    "hoeffding_tree = compose.Pipeline(\n",
    "    (\"drop_features\", compose.Discard(\"sensorId\", \"timestamp\")),\n",
    "    (\"hoeffding_tree\", tree.HoeffdingTreeRegressor(grace_period=5)),\n",
    ")\n",
    "\n",
    "\n",
    "def draw_tree(self, event, streamId):\n",
    "    \"\"\"Draw the tree and save the image.\"\"\"\n",
    "    if self.learning:\n",
    "        if self.model[1].n_nodes != None:\n",
    "            self.model[1].draw().render(\"hoeffding_tree\", format=\"png\", cleanup=True)\n",
    "\n",
    "\n",
    "def save_model(self):\n",
    "    \"\"\"Save the trained model.\"\"\"\n",
    "    with open(\"hoeffding_tree.pkl\", \"wb\") as f:\n",
    "        pickle.dump(self.model, f)\n",
    "\n",
    "\n",
    "regressor = OnlineML(\n",
    "    client=client,\n",
    "    stream_ids=[\"sp:spdatastream:xboBFK\"],\n",
    "    model=hoeffding_tree,\n",
    "    prediction_type=RuntimeType.FLOAT.value,\n",
    "    supervised=True,\n",
    "    target_label=\"temperature\",\n",
    "    on_event=draw_tree,\n",
    "    on_stop=save_model,\n",
    ")\n",
    "regressor.start()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "regressor.set_learning(False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "regressor.stop()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## DecisionTreeClassifier"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pickle\n",
    "from river import compose, tree\n",
    "from streampipes.function_zoo.river_function import OnlineML\n",
    "from streampipes.functions.utils.data_stream_generator import RuntimeType\n",
    "\n",
    "decision_tree = compose.Pipeline(\n",
    "    (\"drop_features\", compose.Discard(\"sensorId\", \"timestamp\")),\n",
    "    (\"decision_tree\", tree.ExtremelyFastDecisionTreeClassifier(grace_period=5)),\n",
    ")\n",
    "\n",
    "\n",
    "def draw_tree(self, event, streamId):\n",
    "    \"\"\"Draw the tree and save the image.\"\"\"\n",
    "    if self.learning:\n",
    "        if self.model[1].n_nodes != None:\n",
    "            self.model[1].draw().render(\"decicion_tree\", format=\"png\", cleanup=True)\n",
    "\n",
    "\n",
    "def save_model(self):\n",
    "    \"\"\"Save the trained model.\"\"\"\n",
    "    with open(\"decision_tree.pkl\", \"wb\") as f:\n",
    "        pickle.dump(self.model, f)\n",
    "\n",
    "\n",
    "classifier = OnlineML(\n",
    "    client=client,\n",
    "    stream_ids=[\"sp:spdatastream:xboBFK\"],\n",
    "    model=decision_tree,\n",
    "    prediction_type=RuntimeType.BOOLEAN.value,\n",
    "    supervised=True,\n",
    "    target_label=\"sensor_fault_flags\",\n",
    "    on_event=draw_tree,\n",
    "    on_stop=save_model,\n",
    ")\n",
    "classifier.start()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "classifier.set_learning(False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "classifier.stop()"
   ]
  },
  {
   "cell_type": "markdown",
   "source": [
    "That's already it! Isn't it truly easy to apply Online ML with StreamPipes and River? Please go ahead and apply it to your own use cases. We would be happy to hear about them!"
   ],
   "metadata": {
    "collapsed": false
   }
  },
  {
   "cell_type": "markdown",
   "source": [
    "Want to see more exciting use cases you can achieve with StreamPipes functions in Python? Then don’t hesitate and jump to our [next tutorial](../5-applying-interoperable-machine-learning-in-streampipes) on using interoperable machine learning algorithm models with StreamPipes Python and [ONNX](https://onnx.ai/)."
   ],
   "metadata": {
    "collapsed": false
   }
  },
  {
   "cell_type": "markdown",
   "source": [
    "How do you like this tutorial?\n",
    "We hope you like it and would love to receive some feedback from you.\n",
    "Just go to our [GitHub discussion page](https://github.com/apache/streampipes/discussions) and let us know your impression.\n",
    "We'll read and react to them all, we promise!"
   ],
   "metadata": {}
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.7"
  },
  "license": "https://www.apache.org/licenses/LICENSE-2.0",
  "orig_nbformat": 4,
  "vscode": {
   "interpreter": {
    "hash": "5b0c980f854fedc946bf92df903f8f9d5dc7805e8fffb7a1ceb21db6f8b983e3"
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
